{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Run custom functions \n",
    "\n",
    "You can use arbitrary functions in the pipeline.\n",
    "\n",
    "您可以在管道中使用任意函数。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "AIMessage(content='3 + 9 equals 12.', response_metadata={'token_usage': {'completion_tokens': 8, 'prompt_tokens': 14, 'total_tokens': 22}, 'model_name': 'gpt-3.5-turbo-0125', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-6a11672e-fb0a-41eb-8088-7eec0232ca87-0', usage_metadata={'input_tokens': 14, 'output_tokens': 8, 'total_tokens': 22})"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from operator import itemgetter\n",
    "\n",
    "from langchain_core.prompts import ChatPromptTemplate\n",
    "from langchain_core.runnables import RunnableLambda\n",
    "from langchain_openai import ChatOpenAI\n",
    "\n",
    "\n",
    "def length_function(text):\n",
    "    return len(text)\n",
    "\n",
    "\n",
    "def _multiple_length_function(text1, text2):\n",
    "    return len(text1) * len(text2)\n",
    "\n",
    "\n",
    "def multiple_length_function(_dict):\n",
    "    return _multiple_length_function(_dict[\"text1\"], _dict[\"text2\"])\n",
    "\n",
    "\n",
    "prompt = ChatPromptTemplate.from_template(\"what is {a} + {b}\")\n",
    "model = ChatOpenAI()\n",
    "\n",
    "chain1 = prompt | model\n",
    "\n",
    "chain = (\n",
    "    {\n",
    "        \"a\": itemgetter(\"foo\") | RunnableLambda(length_function),\n",
    "        \"b\": {\"text1\": itemgetter(\"foo\"), \"text2\": itemgetter(\"bar\")}\n",
    "        | RunnableLambda(multiple_length_function),\n",
    "    }\n",
    "    | prompt\n",
    "    | model\n",
    ")\n",
    "\n",
    "chain.invoke({\"foo\": \"bar\", \"bar\": \"gah\"})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Accepting a Runnable Config\n",
    "\n",
    "Runnable lambdas can optionally accept a RunnableConfig, which they can use to pass callbacks, tags, and other configuration information to nested runs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'foo': 'bar'}\n",
      "Tokens Used: 62\n",
      "\tPrompt Tokens: 56\n",
      "\tCompletion Tokens: 6\n",
      "Successful Requests: 1\n",
      "Total Cost (USD): $3.7e-05\n"
     ]
    }
   ],
   "source": [
    "from langchain_core.output_parsers import StrOutputParser\n",
    "from langchain_core.runnables import RunnableConfig\n",
    "from langchain_community.callbacks import get_openai_callback\n",
    "import json\n",
    "\n",
    "\n",
    "def parse_or_fix(text: str, config: RunnableConfig):\n",
    "    fixing_chain = (\n",
    "        ChatPromptTemplate.from_template(\n",
    "            \"Fix the following text:\\n\\n```text\\n{input}\\n```\\nError: {error}\"\n",
    "            \" Don't narrate, just respond with the fixed data.\"\n",
    "        )\n",
    "        | ChatOpenAI()\n",
    "        | StrOutputParser()\n",
    "    )\n",
    "    for _ in range(3):\n",
    "        try:\n",
    "            return json.loads(text)\n",
    "        except Exception as e:\n",
    "            text = fixing_chain.invoke({\"input\": text, \"error\": e}, config)\n",
    "    return \"Failed to parse\"\n",
    "\n",
    "\n",
    "with get_openai_callback() as cb:\n",
    "    output = RunnableLambda(parse_or_fix).invoke(\n",
    "        \"{foo: bar\", {\"tags\": [\"my-tag\"], \"callbacks\": [cb]}\n",
    "    )\n",
    "    print(output)\n",
    "    print(cb)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Streaming"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "lion, tiger, wolf, gorilla, panda"
     ]
    }
   ],
   "source": [
    "from typing import Iterator, List\n",
    "\n",
    "prompt = ChatPromptTemplate.from_template(\n",
    "    \"Write a comma-separated list of 5 animals similar to: {animal}. Do not include numbers\"\n",
    ")\n",
    "model = ChatOpenAI(temperature=0.0)\n",
    "\n",
    "str_chain = prompt | model | StrOutputParser()\n",
    "\n",
    "for chunk in str_chain.stream({\"animal\": \"bear\"}):\n",
    "    print(chunk, end=\"\", flush=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'lion, tiger, wolf, gorilla, elephant'"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "str_chain.invoke({\"animal\": \"bear\"})"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['lion']\n",
      "['tiger']\n",
      "['wolf']\n",
      "['gorilla']\n",
      "['panda']\n"
     ]
    }
   ],
   "source": [
    "# This is a custom parser that splits an iterator of llm tokens\n",
    "# into a list of strings separated by commas\n",
    "def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:\n",
    "    # hold partial input until we get a comma\n",
    "    buffer = \"\"\n",
    "    for chunk in input:\n",
    "        # add current chunk to buffer\n",
    "        buffer += chunk\n",
    "        # while there are commas in the buffer\n",
    "        while \",\" in buffer:\n",
    "            # split buffer on comma\n",
    "            comma_index = buffer.index(\",\")\n",
    "            # yield everything before the comma\n",
    "            yield [buffer[:comma_index].strip()]\n",
    "            # save the rest for the next iteration\n",
    "            buffer = buffer[comma_index + 1 :]\n",
    "    # yield the last chunk\n",
    "    yield [buffer.strip()]\n",
    "\n",
    "list_chain = str_chain | split_into_list\n",
    "for chunk in list_chain.stream({\"animal\": \"bear\"}):\n",
    "    print(chunk, flush=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['lion', 'tiger', 'wolf', 'gorilla', 'panda']"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "list_chain.invoke({\"animal\": \"bear\"})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Async version"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['koala']\n",
      "['panda']\n",
      "['grizzly']\n",
      "['polar bear']\n",
      "['black bear']\n"
     ]
    }
   ],
   "source": [
    "from typing import AsyncIterator\n",
    "\n",
    "\n",
    "async def asplit_into_list(\n",
    "    input: AsyncIterator[str],\n",
    ") -> AsyncIterator[List[str]]:  # async def\n",
    "    buffer = \"\"\n",
    "    async for (\n",
    "        chunk\n",
    "    ) in input:  # `input` is a `async_generator` object, so use `async for`\n",
    "        buffer += chunk\n",
    "        while \",\" in buffer:\n",
    "            comma_index = buffer.index(\",\")\n",
    "            yield [buffer[:comma_index].strip()]\n",
    "            buffer = buffer[comma_index + 1 :]\n",
    "    yield [buffer.strip()]\n",
    "\n",
    "\n",
    "list_chain = str_chain | asplit_into_list\n",
    "async for chunk in list_chain.astream({\"animal\": \"bear\"}):\n",
    "    print(chunk, flush=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['lion', 'tiger', 'wolf', 'raccoon', 'gorilla']"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "await list_chain.ainvoke({\"animal\": \"bear\"})"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "langchain0_1",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
